package com.itbac.lock.zookeeperLock;

import com.google.common.base.Charsets;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;

/**
 *  zk分布式锁
 * 临时顺序节点，解决惊群效应问题。
 * 使用 I0Itec.zkclient.ZkClient 第三方客户端
 */
public class ZkLock implements Lock {

    private static Logger logger = LoggerFactory.getLogger(ZkLock.class);

    /**
     * 利益临时顺序节点来实现分布式锁
     * 获取锁：取排队号（创建自己的临时顺序节点）,然后判断自己是否是最小号，
     * 如果是，则获得锁，不是，则注册前一个节点的watcher ,阻塞等待
     * 释放锁：删除自己创建的临时顺序节点
     */
    //根节点
    private static final String WORKSPACE = "/lock-workspace";
    //锁路径
    private String lockPath;
    //锁名称
    private String lockName;
    //第三方客户端
    private ZkClient client;
    //当前节点
    private ThreadLocal<String> currentPath = new ThreadLocal<String>();
    //前一个节点
    private ThreadLocal<String> beforePath = new ThreadLocal<String>();
    //锁重入计算
    private ThreadLocal<Integer> reentrantCount = new ThreadLocal<Integer>();

    public ZkLock(String lockName) {
        super();
        this.lockName = lockName;
        this.lockPath = WORKSPACE + "/" + lockName;
        //创建第三方客户端
        client = new ZkClient("192.168.0.88:2181");
        //设置 UTF-8 序列化器
        client.setZkSerializer(new ZkSerializer() {
            @Override
            public byte[] serialize(Object o) throws ZkMarshallingError {
                return String.valueOf(o).getBytes(Charsets.UTF_8);
            }

            @Override
            public Object deserialize(byte[] bytes) throws ZkMarshallingError {
                return new String(bytes, Charsets.UTF_8);
            }
        });
        //锁的根节点
        if (!this.client.exists(WORKSPACE)) {
            try {
                //创建持久节点（父节点）
                this.client.createPersistent(WORKSPACE);
            } catch (Exception e) {

            }
        }
    }

    @Override
    public void lock() {
        //尝试加锁
        if (!tryLock()) {
            //阻塞等待
            waitForLock();
            // TODO 递归，再次尝试加锁.(我认为这里是这个分布式锁最经典的设计) bac
            lock();
        }

    }
    //阻塞等待
    private void waitForLock() {
        String beforePath = this.beforePath.get();
        if (StringUtils.isEmpty(beforePath)) {
            logger.info("beforePath为空");
            return;
        }
        //倒计数器
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        //注册watcher
        IZkDataListener listener = new IZkDataListener() {
            @Override
            public void handleDataChange(String s, Object o) throws Exception {
                //数据改变通知。
            }
            @Override
            public void handleDataDeleted(String s) throws Exception {
                //数据删除通知。
                logger.info("监听到节点被删除，" + s);
                //减一
                countDownLatch.countDown();
            }
        };
        //订阅，监听前一个临时顺序节点
        this.client.subscribeDataChanges(this.beforePath.get(), listener);
        //怎么让自己阻塞
        if (this.client.exists(this.beforePath.get())) {
            try {
                //阻塞
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //醒来后，取消watcher
        this.client.unsubscribeDataChanges(this.beforePath.get(), listener);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        if (this.reentrantCount.get()!=null) {
            int count = this.reentrantCount.get();
            if (count > 0) {
                this.reentrantCount.set(++count);
                return true;
            }
        }
        if (this.currentPath.get() == null) {
            //TODO 临时顺序节点 (子节点) 设计包含锁名的临时顺序节点
            String newCurrentPath = this.client.createEphemeralSequential(this.lockPath, "lock");
            logger.info("newCurrentPath:" + newCurrentPath);
            this.currentPath.set(newCurrentPath);
        }
        //获得锁根节点下的，包含当前锁名（前缀）的所有的临时顺序的子节点,
        //sorted 排序
        List<String> workspaceChildren = this.client.getChildren(WORKSPACE);
        List<String> children = this.client.getChildren(WORKSPACE).stream().filter(s -> s.startsWith(this.lockName)).sorted().collect(Collectors.toList());
        if (CollectionUtils.isEmpty(children)){
            logger.info("children为空");
            logger.info("workspaceChildren:" + workspaceChildren.toString());
            //没有找到子节点
            String currentPath = this.currentPath.get();
            boolean exists = this.client.exists(currentPath);
            logger.info("当前节点是否存在：" + exists);
            workspaceChildren = this.client.getChildren(WORKSPACE);
            logger.info("再次获取子节点,workspaceChildren:" + workspaceChildren.toString());
            if (!StringUtils.isEmpty(currentPath)){
                //递归删除当前节点
                this.client.deleteRecursive(currentPath);
                logger.info("children为空,删除currentPath：" + currentPath);
                //删除当前线程变量
                this.currentPath.remove();
            }
            return false;
        }
        //判断当前节点是否是最小的
        boolean isFirst = this.currentPath.get().equals(WORKSPACE + "/" + children.get(0));
        if (isFirst) {
            this.reentrantCount.set(1);
            return true;
        }else {
            //得到当前临时顺序节点的索引号
            int curIndex = children.indexOf(this.currentPath.get().replace(WORKSPACE + "/", ""));
            if (0 == curIndex){
                //当前节点排第一。
                logger.info("当前节点排第一,currentPath:{},children:{}", this.currentPath.get(), children);
                return true;
            }
             logger.info("查看当前节点位置：currentPath:{},children:{}", this.currentPath.get(), children);
            //设置前一个节点，用来监听。
            // TODO （一旦被唤醒，就递归监听前一个。就算前一个是网络中断也不会出错.）
            beforePath.set(WORKSPACE + "/" + children.get(curIndex - 1));
        }
        return false;
    }



    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock() {
        //重入的释放锁处理
        if (this.reentrantCount.get() != null) {
            int count = this.reentrantCount.get();
            if (count > 1) {
                this.reentrantCount.set(--count);
                return;
            }else {
                this.reentrantCount.remove();
            }
        }
        //删除当前节点
        this.client.deleteRecursive(this.currentPath.get());
        this.currentPath.remove();
    }

    @Override
    public Condition newCondition() {
        return null;
    }

    //测试
    public static void main(String[] args) {
        ZkLock lock = new ZkLock("distLock");
        //被并发修改的公共数据
        final Integer[] num = {0};

        long begin = System.currentTimeMillis();


        //并发数
        int currency = 5;
        //循环屏障
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(currency);
        //多线程模拟高并发
        for (int i = 0; i < currency; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    logger.info(Thread.currentThread().getName() + "——————我准备好——————");
                    //等待一起抢锁
                    try {
                        cyclicBarrier.wait();
                    } catch (Exception e) {

                    }

                    Random random = new Random();

                    for (int i1 = 0; i1 < 100; i1++) {
                        try {
                            lock.lock();
                            logger.info(Thread.currentThread().getName() + "————!——获得了锁,——！————" + num[0]);
//                            try {
//                                Thread.sleep(random.nextInt(300));
//                            } catch (InterruptedException e) {
//                                e.printStackTrace();
//                            }
                            num[0] = ++num[0];
                            logger.info(Thread.currentThread().getName() + "————!——执行加一操作,得到结果：——！————" + num[0]);
                        } finally {
                            lock.unlock();
                        }
                        if (i1==99){
                            long end = System.currentTimeMillis();
                            logger.info("总耗时：" + (end - begin));
                            //加锁操作500次，总耗时：3066, 每次获取锁的时间是：6.132毫秒
                        }
                    }
                }
            }).start();
        }
    }
}
